package org.example.table;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.example.domin.DataDay;
import org.example.utils.JDBCUtils;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author lwc
 * @description: TODO
 * @date 2023/12/21 20:41
 */
@Slf4j
public class ReadCurveDws {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("e_mp_read_curveTable");
        SparkSession spark = null;
        if (args.length >= 1) {
            spark = SparkSession.builder().appName("e_mp_read_curveTable").config(sparkConf)
//                    .config("spark.master", "local[*]")  // 使用本地模式，[*]表示使用所有可用的核心
                    .enableHiveSupport().getOrCreate();
            log.info("加上了local[*]");
        } else {
            spark = SparkSession.builder().appName("e_mp_read_curveTable").config(sparkConf).config("spark.master", "local[*]")  // 使用本地模式，[*]表示使用所有可用的核心
                    .enableHiveSupport().getOrCreate();
        }
        Data(spark, sparkConf);
        spark.stop();
    }


    public static void Data(SparkSession spark, SparkConf sparkConf) {
        log.info("=================开始查询e_mp_read_curveTable数据===================");
        String startDate = DateUtil.beginOfDay(DateUtil.date()).toString("yyyyMMdd");
        String endDate = DateUtil.offsetDay(DateUtil.parse(DateUtil.beginOfDay(DateUtil.date()).toDateStr()), -1).toString("yyyyMMdd");

        String table = sparkConf.get("spark.app.table", "e_mp_read_day");
        String startTime = sparkConf.get("spark.app.startTime", endDate);
        String endTime = sparkConf.get("spark.app.endTime", startDate);
        String dwsMeter = sparkConf.get("spark.app.dwsMeter", "t_bus_meterpointmng");
        String dwsTable = sparkConf.get("spark.app.dwsTable", "t_work_202302data_day");
        String url = sparkConf.get("spark.app.url", "jdbc:mysql://localhost:3306/test");
        String username = sparkConf.get("spark.app.username", "root");
        String password = sparkConf.get("spark.app.password", "9XME3z94xs9nhCj");

        log.info("mysql 连接的参数连接：{},表名:{},用户:{}，密码：{}", url, dwsMeter, username, password);
        log.info("hive 参数，表名：{},开始时间：{},结束时间：{}", table, startDate, endDate);

        Dataset<Row> databases = spark.sql("show databases");
        databases.show();
        try {
            //查询mysql meter
            Dataset<Row> mysqlData = spark.read().format("jdbc").option("url", url).option("dbtable", "(select dev_id from " + dwsMeter + ") as temp").option("user", username).option("password", password).load();
            mysqlData.show();

            //查询hive
            StringBuilder hiveSql = new StringBuilder();
            hiveSql.append("select * from ods_amr20_hbase.").append(table).append(" where ds >= '").append(startTime).append("'").append(" and ds <='").append(endTime).append("'");
            log.info("执行的hiveSql为：{}", hiveSql);
            Dataset<Row> hiveData = spark.sql(hiveSql.toString());
            //两个表进行join  操作
            Dataset<Row> rowDataset = mysqlData.join(hiveData, mysqlData.col("dev_id").equalTo(hiveData.col("dev_id")));

            JavaRDD<DataDay> tobjLdHourVaJavaRDD = rowDataset.javaRDD().flatMap((FlatMapFunction<Row, DataDay>) row -> convertRowToDevTimeData(row, table).iterator());
//            // 最终得到合并后的 JavaRDD
            JavaRDD<DataDay> resultRDD = tobjLdHourVaJavaRDD.map(data -> {
                DataDay resultData = new DataDay();
                resultData.setF_measurement_points(data.getF_measurement_points());
                resultData.setF_data_collection_time(data.getF_data_collection_time());
                resultData.setF_key_name(data.getF_key_name());
                resultData.setF_data_input_time(data.getF_data_input_time());
                resultData.setF_delete(0L);
                resultData.setData_e0(data.getData_e0());
                resultData.setData_e1(data.getData_e1());
                resultData.setData_e2(data.getData_e2());
                resultData.setData_e3(data.getData_e3());
                resultData.setData_e4(data.getData_e4());
                resultData.setF_data_input_time(DateUtil.date().toDateStr());
//                resultData.setF_key_name(data._1.get);
                return resultData;
            });

            Dataset<DataDay> dataset = spark.createDataset(resultRDD.rdd(), Encoders.bean(DataDay.class));

            dataset.show();
            log.info("保存前进行删除操作，");
            Connection connection = JDBCUtils.getConnection(url, username, password);
            StringBuilder deleteSql = new StringBuilder("delete from " + dwsTable + " where f_data_collection_time <='" + startDate + "' and f_data_collection_time >='" + endDate + "'");
            deleteSql.append(" and f_key_name in ('10139B30','10139B10','10139B20','10139B40','10139D30','10139D40','10139D50','10139D60')");
            log.info("删除的语句为：{}", deleteSql);
            boolean execute = connection.createStatement().execute(deleteSql.toString());
            log.info("删除结果为：{}", execute);
            dataset.write().format("jdbc").option("url", url).option("dbtable", dwsTable).option("user", username).option("password", password).mode(SaveMode.Append).save();


        } catch (Exception e) {
            e.printStackTrace();
            log.error("出现错误：{}", e.getMessage());
        }

        log.info("=================开始查询E_MP_U_CURVE 结束===================");
    }


    /**
     * 组装数据
     *
     * @param row
     * @param dwsTable
     * @return
     */
    public static List<DataDay> convertRowToDevTimeData(Row row, String dwsTable) {
        List<DataDay> devTimeDataList = new ArrayList<>();
        String devId = getRowObject(row, "dev_id").toString();
        Object dataDate = getRowObject(row, "ds");
        Object papRColTime = getRowObject(row, "pap_r_col_time");
        //<==        Row: true, 112, 10139C09, 正向有功电能示值, 101, 电表, 3, 日冻结, 1, 电量, kWh, null, 1, 0, 2022-01-26 15:06:46, 2021-12-08 10:00:18, 0, 0
        //<==        Row: true, 117, 10139C19, 反向有功电能示值, 101, 电表, 3, 日冻结, 1, 电量, kWh, null, 1, 0, 2022-01-26 15:06:48, 2021-12-08 10:00:18, 0, 0
        //<==        Row: true, 122, 10139D10, 正向无功电能示值, 101, 电表, 3, 日冻结, 1, 电量, kvarh, null, 1, 0, 2022-01-26 15:06:52, 2021-12-08 10:00:18, 0, 0
        //<==        Row: true, 127, 10139D20, 反向无功电能示值, 101, 电表, 3, 日冻结, 1, 电量, kvarh, null, 1, 0, 2022-01-26 15:06:53, 2021-12-08 10:00:18, 0, 0
        //<==        Row: true, 136, 10139D30, 一象限无功电能示值, 101, 电表, 3, 日冻结, 1, 电量, kvarh, null, 1, 0, 2022-01-26 15:09:12, 2021-12-08 10:00:18, 0, 0
        //<==        Row: true, 141, 10139D40, 四象限无功电能示值, 101, 电表, 3, 日冻结, 1, 电量, kvarh, null, 1, 0, 2022-01-26 15:09:13, 2021-12-08 10:00:18, 0, 0
        //<==        Row: true, 146, 10139D50, 二象限无功电能示值, 101, 电表, 3, 日冻结, 1, 电量, kvarh, null, 1, 0, 2022-01-26 15:09:14, 2021-12-08 10:00:18, 0, 0
        //<==        Row: true, 151, 10139D60, 三象限无功电能示值, 101, 电表, 3, 日冻结, 1, 电量, kvarh, null, 1, 0, 2022-01-26 15:09:15, 2021-12-08 10:00:18, 0, 0
        //反向有功总电能示值  10139B30  ,2、正向有功总电能示值  10139B10,3、反向无功总电能示值   10139B40  4、正向无功总电能示值   10139B20
        //RAP_R_COL_TIME,rap_r,rap_r1,rap_r2,rap_r3,rap_r4,0
        //pap_r_col_time,pap_r,pap_r1,pap_r2,pap_r3,pap_r4,0
        //RRP_R_COL_TIME,rrp_r,rrp_r1,rrp_r2,rrp_r3,rrp_r4,0
        //PRP_R_COL_TIME,prp_r,prp_r1,prp_r2,prp_r3,prp_r4,0
        //反向有功总电能势值
        devTimeDataList.add(getDataDaySource(row, devId, "rap_r", dataDate,"10139C19"));
        devTimeDataList.add(getDataDaySource(row, devId, "pap_r", dataDate,"10139C09"));
        devTimeDataList.add(getDataDaySource(row, devId, "rrp_r", dataDate,"10139D20"));
        devTimeDataList.add(getDataDaySource(row, devId, "prp_r", dataDate,"10139D10"));
        //一、二、三、四 象限  RP1_R:10139D30,RP2_R:10139D50,RP3_R:10139D60,RP4_R:10139D40
        //一象限
        devTimeDataList.add(getrp(row, dataDate, devId,"1","10139D30"));
        devTimeDataList.add(getrp(row, dataDate, devId,"2","10139D50"));
        devTimeDataList.add(getrp(row, dataDate, devId,"3","10139D60"));
        devTimeDataList.add(getrp(row, dataDate, devId,"4","10139D40"));
        return devTimeDataList.stream().filter(data -> ObjectUtil.isNotNull(data)).collect(Collectors.toList());
    }


    /**
     * 获取象限
     *
     * @param row
     * @param dataDate
     * @param s
     * @return
     */
    private static DataDay getrp(Row row, Object dataDate, String type, String keyName, String devId) {
        Object rp1R = getRowObject(row, "rp"+type+"_r");
        if (ObjectUtil.isNotNull(rp1R)&&NumberUtil.isNumber(rp1R.toString())) {
            return DataDay.builder()
                    .f_measurement_points(Long.parseLong(devId))
                    .f_data_collection_time(DateUtil.parse(dataDate.toString()).toDateStr())
                    .f_data_input_time(DateUtil.parse(dataDate.toString()).toDateStr())
                    .f_key_name(keyName)
                    .data_e0(Double.parseDouble(rp1R.toString()))
                    .data_e1(Double.parseDouble(rp1R.toString()))
                    .data_e2(Double.parseDouble(rp1R.toString()))
                    .data_e3(Double.parseDouble(rp1R.toString()))
                    .data_e4(Double.parseDouble(rp1R.toString()))
                    .build();
        }else {
            return null;
        }
    }

    private static DataDay getDataDaySource(Row row, String devId, String pType, Object dataDate,String keyName) {
        Object rapR = getRowObject(row, pType);
        Object papRColTime = getRowObject(row, pType + "_col_time");
        if (ObjectUtil.isNotNull(rapR)&&NumberUtil.isNumber(rapR.toString())) {
            Object rapR1 = getRowObject(row, pType + "1");
            Object rapR2 = getRowObject(row, pType + "2");
            Object rapR3 = getRowObject(row, pType + "3");
            Object rapR4 = getRowObject(row, pType + "4");
            return DataDay.builder()
                    .f_measurement_points(Long.parseLong(devId.toString()))
                    .f_data_input_time(ObjectUtil.isNotNull(papRColTime) ? DateUtil.parse(papRColTime.toString()).toDateStr() : DateUtil.parse(dataDate.toString()).toDateStr())
                    .f_data_collection_time(DateUtil.parse(dataDate.toString()).toDateStr())
                    .f_key_name(keyName)
                    .data_e0(Double.parseDouble(rapR.toString()))
                    .data_e1((ObjectUtil.isNotNull(rapR1)&&NumberUtil.isNumber(rapR1.toString()))?Double.parseDouble(rapR1.toString()):null)
                    .data_e2((ObjectUtil.isNotNull(rapR2)&&NumberUtil.isNumber(rapR2.toString()))?Double.parseDouble(rapR2.toString()):null)
                    .data_e3((ObjectUtil.isNotNull(rapR3)&&NumberUtil.isNumber(rapR3.toString()))?Double.parseDouble(rapR3.toString()):null)
                    .data_e4((ObjectUtil.isNotNull(rapR4)&&NumberUtil.isNumber(rapR4.toString()))?Double.parseDouble(rapR4.toString()):null)
                    .build();
        } else {
            return null;
        }
    }




    /**
     * 取出数据
     *
     * @param data
     * @param point
     * @return
     */
    private static Object getRowObject(Row data, String point) {
        if (!data.schema().getFieldIndex(point).toList().isEmpty() && ObjectUtil.isNotNull(data.getAs(point))) {
            return data.getAs(point);
        } else {
            return null;
        }
    }

}
